<?php
/**
 * Created by PhpStorm.
 * User: longli
 * VX: isa1589518286
 * Date: 2019/07/18
 * Time: 9:59
 * @link http://www.lmterp.cn
 */

namespace app\common\library\kafka;

use RdKafka\Producer;
use RdKafka\TopicConf;

/**
 * kafka 生产者客户端
 * Class KafkaProducer
 * @package app\common\library\kafka
 */
class KafkaProducer
{

    /**
     * 生产对象
     * @var Producer
     */
    protected $producer;

    /**
     * 默认主题
     * @var string
     */
    protected $defaultTopic = 'default';

    /**
     * 配置信息
     * @var array
     */
    protected $config = [];

    public function __construct(array $config = [])
    {
        $this->config = array_merge($this->config, $config);
        $this->init();
    }

    /**
     * 获取生产主题
     * @param string $name 主题名称
     * @param array $conf 主题配置信息
     * @return mixed
     * @date 2019/07/18
     * @author longli
     */
    public function getTopic($name, array $conf = [])
    {
        $topicConf = new TopicConf();
        foreach($conf as $k => $v)
            $topicConf->set($k, $v);
        return $this->producer->newTopic($name, $topicConf);
    }

    /**
     * 生产内容
     * @param array $data 生产内容
     * @param string $topic 主题
     * @param string $key 键
     * @date 2019/07/18
     * @author longli
     */
    public function push(array $data, $topic = "", $key = "")
    {
        if(empty($topic)) $topic = $this->getDefaultTopic();
        $topicObj = $this->getTopic($topic);
        $topicObj->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($data), $key);
    }

    /**
     * 初始化生产者
     * @date 2019/07/18
     * @author longli
     */
    protected function init()
    {
        $this->producer = new Producer();
        $this->producer->addBrokers(
           is_array($this->config['brokers'])
           ? join(',', $this->config['brokers'])
           : $this->config['brokers']
        );
    }

    /**
     * 获取配置信息
     * @return array
     * @date 2019/07/18
     * @author longli
     */
    public function getConfig()
    {
        return $this->config;
    }

    /**
     * @return string
     */
    public function getDefaultTopic()
    {
        return $this->defaultTopic;
    }

    /**
     * @param string $defaultTopic
     */
    public function setDefaultTopic($defaultTopic)
    {
        $this->defaultTopic = $defaultTopic;
    }
}